iT邦幫忙

2025 iThome 鐵人賽

DAY 20
0
Rust

Rust 實戰專案集:30 個漸進式專案從工具到服務系列 第 20

資料庫遷移工具 - SQL 資料庫 schema 版本管理

  • 分享至 

  • xImage
  •  

前言

過去我們針對資料做處理,整理,清洗等,我們這次針對 sql 相關主題進行操作
今天我們針對 sql schema 進行管理。 我們將使用 Rust 打造一個類似 Flyway 或 Liquibase 的資料庫遷移工具,
幫助開發者自動化管理資料庫 schema 版本,當團隊規模擴大、專案複雜度提升時,
手動管理資料庫變更不僅容易出錯,也難以追蹤歷史變更

今日學習目標

  • 理解資料庫遷移的核心概念與最佳實踐
  • 使用 sqlx 進行資料庫操作
  • 實作 migration 檔案的掃描與排序機制
  • 設計版本控制表來追蹤遷移歷史
  • 處理遷移失敗的回滾機制

開始專案

cargo new db-migrator
cd db-migrator

依賴

Cargo.toml

[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres", "sqlite"] }
tokio = { version = "1.0", features = ["full"] }
clap = { version = "4.0", features = ["derive"] }
chrono = "0.4"
anyhow = "1.0"
regex = "1.0"
sha2 = "0.10"

設計

  1. Migration 檔案格式
    我們採用底下的命名規範
V{version}__{description}.sql

ex :

  • V001__create_users_table.sql
  • V002__add_email_index.sql
  • V003__create_posts_table.sql

版本控制表結構

CREATE TABLE IF NOT EXISTS schema_migrations (
    version VARCHAR(50) PRIMARY KEY,
    description TEXT NOT NULL,
    checksum VARCHAR(64) NOT NULL,
    executed_at TIMESTAMP NOT NULL,
    execution_time_ms INTEGER NOT NULL,
    success BOOLEAN NOT NULL
);

開始實作

Migration 定義結構

use chrono::{DateTime, Utc};
use std::path::PathBuf;

#[derive(Debug, Clone)]
pub struct Migration {
    pub version: String,
    pub description: String,
    pub file_path: PathBuf,
    pub sql_content: String,
    pub checksum: String,
}

#[derive(Debug, Clone)]
pub struct MigrationRecord {
    pub version: String,
    pub description: String,
    pub checksum: String,
    pub executed_at: DateTime<Utc>,
    pub execution_time_ms: i32,
    pub success: bool,
}

impl Migration {
    pub fn from_file(path: PathBuf) -> anyhow::Result<Self> {
        let filename = path
            .file_name()
            .and_then(|n| n.to_str())
            .ok_or_else(|| anyhow::anyhow!("Invalid filename"))?;

        // 解析檔名: V001__create_users_table.sql
        let re = regex::Regex::new(r"^V(\d+)__(.+)\.sql$")?;
        let caps = re
            .captures(filename)
            .ok_or_else(|| anyhow::anyhow!("Invalid migration filename format: {}", filename))?;

        let version = caps[1].to_string();
        let description = caps[2].replace('_', " ");

        let sql_content = std::fs::read_to_string(&path)?;
        let checksum = Self::calculate_checksum(&sql_content);

        Ok(Migration {
            version,
            description,
            file_path: path,
            sql_content,
            checksum,
        })
    }

    fn calculate_checksum(content: &str) -> String {
        use sha2::{Digest, Sha256};
        let mut hasher = Sha256::new();
        hasher.update(content.as_bytes());
        format!("{:x}", hasher.finalize())
    }
}

這裡我們製作 Migrator 核心邏輯

use sqlx::{Pool, Postgres, Row};
use std::path::Path;
use std::time::Instant;

pub struct Migrator {
    pool: Pool<Postgres>,
    migrations_dir: PathBuf,
}

impl Migrator {
    pub async fn new(database_url: &str, migrations_dir: PathBuf) -> anyhow::Result<Self> {
        let pool = Pool::<Postgres>::connect(database_url).await?;
        Ok(Migrator {
            pool,
            migrations_dir,
        })
    }

    pub async fn init(&self) -> anyhow::Result<()> {
        println!("🔧 Initializing schema_migrations table...");
        
        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS schema_migrations (
                version VARCHAR(50) PRIMARY KEY,
                description TEXT NOT NULL,
                checksum VARCHAR(64) NOT NULL,
                executed_at TIMESTAMP NOT NULL,
                execution_time_ms INTEGER NOT NULL,
                success BOOLEAN NOT NULL
            )
            "#,
        )
        .execute(&self.pool)
        .await?;

        println!("✅ Schema migrations table ready");
        Ok(())
    }

    pub async fn load_migrations(&self) -> anyhow::Result<Vec<Migration>> {
        let mut migrations = Vec::new();
        let entries = std::fs::read_dir(&self.migrations_dir)?;

        for entry in entries {
            let entry = entry?;
            let path = entry.path();

            if path.extension().and_then(|s| s.to_str()) == Some("sql") {
                match Migration::from_file(path.clone()) {
                    Ok(migration) => migrations.push(migration),
                    Err(e) => {
                        eprintln!("⚠️  Failed to parse migration {:?}: {}", path, e);
                    }
                }
            }
        }

        // 按版本號排序
        migrations.sort_by(|a, b| {
            a.version
                .parse::<i32>()
                .unwrap_or(0)
                .cmp(&b.version.parse::<i32>().unwrap_or(0))
        });

        Ok(migrations)
    }

    pub async fn get_applied_migrations(&self) -> anyhow::Result<Vec<MigrationRecord>> {
        let records = sqlx::query_as::<_, MigrationRecord>(
            "SELECT version, description, checksum, executed_at, execution_time_ms, success 
             FROM schema_migrations 
             ORDER BY version"
        )
        .fetch_all(&self.pool)
        .await?;

        Ok(records)
    }

    pub async fn migrate(&self) -> anyhow::Result<()> {
        println!("🚀 Starting database migration...\n");

        let migrations = self.load_migrations().await?;
        let applied = self.get_applied_migrations().await?;
        let applied_versions: std::collections::HashSet<_> = 
            applied.iter().map(|r| r.version.clone()).collect();

        let pending: Vec<_> = migrations
            .iter()
            .filter(|m| !applied_versions.contains(&m.version))
            .collect();

        if pending.is_empty() {
            println!("✅ Database is up to date. No pending migrations.");
            return Ok(());
        }

        println!("📋 Found {} pending migration(s):\n", pending.len());
        for migration in &pending {
            println!("  V{} - {}", migration.version, migration.description);
        }
        println!();

        for migration in pending {
            self.apply_migration(migration).await?;
        }

        println!("\n🎉 All migrations completed successfully!");
        Ok(())
    }

    async fn apply_migration(&self, migration: &Migration) -> anyhow::Result<()> {
        println!("⏳ Applying V{} - {}...", migration.version, migration.description);

        let start = Instant::now();
        let mut tx = self.pool.begin().await?;

        match sqlx::query(&migration.sql_content).execute(&mut *tx).await {
            Ok(_) => {
                let duration = start.elapsed().as_millis() as i32;

                // 記錄成功的遷移
                sqlx::query(
                    r#"
                    INSERT INTO schema_migrations 
                    (version, description, checksum, executed_at, execution_time_ms, success)
                    VALUES ($1, $2, $3, $4, $5, $6)
                    "#,
                )
                .bind(&migration.version)
                .bind(&migration.description)
                .bind(&migration.checksum)
                .bind(Utc::now())
                .bind(duration)
                .bind(true)
                .execute(&mut *tx)
                .await?;

                tx.commit().await?;
                println!("✅ Applied V{} in {}ms", migration.version, duration);
                Ok(())
            }
            Err(e) => {
                tx.rollback().await?;
                println!("❌ Failed to apply V{}: {}", migration.version, e);
                Err(e.into())
            }
        }
    }

    pub async fn status(&self) -> anyhow::Result<()> {
        println!("📊 Migration Status\n");

        let migrations = self.load_migrations().await?;
        let applied = self.get_applied_migrations().await?;
        let applied_map: std::collections::HashMap<_, _> = 
            applied.iter().map(|r| (r.version.clone(), r)).collect();

        println!("{:<10} {:<40} {:<15} {:<20}", 
                 "Version", "Description", "Status", "Executed At");
        println!("{}", "-".repeat(85));

        for migration in migrations {
            if let Some(record) = applied_map.get(&migration.version) {
                let status = if record.success { "✅ Applied" } else { "❌ Failed" };
                println!(
                    "{:<10} {:<40} {:<15} {:<20}",
                    format!("V{}", migration.version),
                    &migration.description[..migration.description.len().min(38)],
                    status,
                    record.executed_at.format("%Y-%m-%d %H:%M:%S")
                );
            } else {
                println!(
                    "{:<10} {:<40} {:<15} {:<20}",
                    format!("V{}", migration.version),
                    &migration.description[..migration.description.len().min(38)],
                    "⏳ Pending",
                    "-"
                );
            }
        }

        Ok(())
    }
}

// 實作 sqlx::FromRow
impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for MigrationRecord {
    fn from_row(row: &sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
        Ok(MigrationRecord {
            version: row.try_get("version")?,
            description: row.try_get("description")?,
            checksum: row.try_get("checksum")?,
            executed_at: row.try_get("executed_at")?,
            execution_time_ms: row.try_get("execution_time_ms")?,
            success: row.try_get("success")?,
        })
    }
}

cli (main.rs)

use clap::{Parser, Subcommand};

#[derive(Parser)]
#[command(name = "db-migrator")]
#[command(about = "Database migration tool for managing schema versions")]
struct Cli {
    #[command(subcommand)]
    command: Commands,

    #[arg(short, long, default_value = "postgres://localhost/mydb")]
    database_url: String,

    #[arg(short, long, default_value = "./migrations")]
    migrations_dir: String,
}

#[derive(Subcommand)]
enum Commands {
    /// Initialize the schema_migrations table
    Init,
    
    /// Apply all pending migrations
    Migrate,
    
    /// Show migration status
    Status,
    
    /// Create a new migration file
    Create {
        /// Description of the migration
        description: String,
    },
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cli = Cli::parse();
    let migrations_dir = PathBuf::from(&cli.migrations_dir);

    // 確保 migrations 目錄存在
    std::fs::create_dir_all(&migrations_dir)?;

    match cli.command {
        Commands::Init => {
            let migrator = Migrator::new(&cli.database_url, migrations_dir).await?;
            migrator.init().await?;
        }
        Commands::Migrate => {
            let migrator = Migrator::new(&cli.database_url, migrations_dir).await?;
            migrator.init().await?;
            migrator.migrate().await?;
        }
        Commands::Status => {
            let migrator = Migrator::new(&cli.database_url, migrations_dir).await?;
            migrator.status().await?;
        }
        Commands::Create { description } => {
            create_migration(&migrations_dir, &description)?;
        }
    }

    Ok(())
}

fn create_migration(dir: &Path, description: &str) -> anyhow::Result<()> {
    let existing = std::fs::read_dir(dir)?
        .filter_map(|e| e.ok())
        .filter_map(|e| {
            let filename = e.file_name().to_string_lossy().to_string();
            if filename.starts_with('V') && filename.ends_with(".sql") {
                filename[1..4].parse::<i32>().ok()
            } else {
                None
            }
        })
        .max()
        .unwrap_or(0);

    let next_version = existing + 1;
    let sanitized_desc = description.replace(' ', "_").to_lowercase();
    let filename = format!("V{:03}__{}.sql", next_version, sanitized_desc);
    let filepath = dir.join(&filename);

    let template = format!(
        "-- Migration: {}\n-- Created: {}\n\n-- Add your SQL here\n\n",
        description,
        Utc::now().format("%Y-%m-%d %H:%M:%S")
    );

    std::fs::write(&filepath, template)?;
    println!("✅ Created migration: {}", filename);
    println!("📝 Edit file: {}", filepath.display());

    Ok(())
}

打完收工!

開始嘗試

step 1 : 這裏初始化

cargo run -- init --database-url postgres://user:pass@localhost/mydb

step 2 : 建立新的 migration

cargo run -- create "create users table"

這會生成 migrations/V001__create_users_table.sql

-- Migration: create users table
-- Created: 2025-10-04 10:30:00

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

step3 :執行 migrate

cargo run -- migrate

step4 : 查看狀態

cargo run -- status

上一篇
JSON Schema 驗證器 - 驗證 JSON 資料格式
下一篇
股價追蹤器 - 抓取股價資料並計算技術指標
系列文
Rust 實戰專案集:30 個漸進式專案從工具到服務24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言